package cn.ticsmyc.tools.multiThread.serialThreadconfinement;

import cn.ticsmyc.tools.multiThread.twoPhaseTermination.AbstractTerminatableThread;

import java.util.concurrent.*;

/**
 * 基于two-PhaseTermination实现的Thread封装
 *
 * @author Ticsmyc
 * @date 2021-06-03 16:07
 */
public class TerminatableWorkerThread<T, V> extends AbstractTerminatableThread {

    /**
     * 用来存储任务
     */
    private final BlockingQueue<Runnable> blockingQueue;

    /**
     * 任务处理器
     */
    private final TaskProcessor<T, V> taskProcessor;

    public TerminatableWorkerThread(BlockingQueue<Runnable> blockingQueue, TaskProcessor<T, V> taskProcessor) {
        this.blockingQueue = blockingQueue;
        this.taskProcessor = taskProcessor;
    }

    /**
     * 提交任务
     */
    public Future<T> submit(final V task) throws ExecutionException, InterruptedException {
        Callable<T> callableTask = (Callable<T>) () -> taskProcessor.doProcess(task);
        FutureTask<T> futureTask = new FutureTask<>(callableTask);
        blockingQueue.put(futureTask);
        this.terminationToken.reservations.incrementAndGet();
        return futureTask;
    }

    /**
     * 在这个线程中处理任务
     */
    @Override
    protected void doRun() {
        try {
            Runnable task = blockingQueue.take();
            task.run();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            this.terminationToken.reservations.decrementAndGet();
        }
    }
}
